common-websocket 模块使用
什么是 WebSocket
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,使得客户端和服务器之间可以实时双向传输数据。
核心特性
- 心跳保活机制: 前端每 30 秒发送
ping 心跳,后端响应 pong,确保连接活性
- 自动重连: 连接断开后自动重连(默认最多 6 次,间隔 5 秒)
- 分布式支持: 通过 Redis Pub/Sub 实现多实例消息分发
- 用户级推送: 基于用户 ID 精准推送消息
- 广播功能: 支持全员广播消息
前端配置开启
开启 WebSocket 转发
在网关配置中开启 WebSocket 转发功能:
引入 WebSocket 组件
在主页面(如 App.vue 或 Layout.vue)引入 WebSocket 组件:
<template>
<!-- 其他页面内容 -->
<global-websocket
uri="/admin/ws/info"
v-if="websocketEnable"
@rollback="rollback"
/>
</template>
<script setup lang="ts">
import { defineAsyncComponent } from 'vue';
// 异步加载 WebSocket 组件
const GlobalWebsocket = defineAsyncComponent(
() => import("/@/components/Websocket/index.vue")
);
// 控制是否启用 WebSocket (可从配置中读取)
const websocketEnable = ref(true);
// 接收到消息的回调函数
const rollback = (msg: string) => {
console.log('收到 WebSocket 消息:', msg);
// 方式1: 使用消息存储 (如示例)
useMsg().setMsg({
label: "websocket消息",
value: msg,
time: formatAxis(new Date()),
});
// 方式2: 自定义业务处理
// handleBusinessMessage(msg);
};
</script>
组件参数说明
| 参数 | 类型 | 必填 | 说明 |
|---|
| uri | String | 是 | WebSocket 连接地址(相对路径) |
| @rollback | Function | 否 | 接收到消息时的回调函数 |
连接机制详解
连接 URL 格式:
ws://[域名][baseURL][uri]?access_token=[token]&TENANT-ID=[tenantId]
或
wss://[域名][baseURL][uri]?access_token=[token]&TENANT-ID=[tenantId]
- 协议自动选择: HTTPS 页面使用
wss://,HTTP 页面使用 ws://
- 认证参数: 自动携带用户 token 和租户 ID
- 完整示例:
ws://localhost:8888/api/admin/ws/info?access_token=xxx&TENANT-ID=1
心跳与重连配置
前端组件内置了以下配置(位于 pigx-ui-pro/src/components/Websocket/index.vue):
const state = reactive({
lockReconnect: false, // 重连锁,避免多次重连
maxReconnect: 6, // 最大重连次数 (-1 表示无限重连)
reconnectTime: 0, // 当前重连尝试次数
heartbeat: {
interval: 30 * 1000, // 心跳间隔: 30秒
timeout: 10 * 1000, // 心跳超时: 10秒
pingMessage: JSON.stringify({ type: 'ping' }), // 心跳消息格式
},
});
心跳流程:
- 连接建立后,每隔 30 秒发送
{"type":"ping"} 消息
- 后端收到后立即返回
{"type":"pong"} 响应
- 如果 10 秒内未收到响应,则认为连接断开,触发重连
- 收到任何消息(包括 pong)都会重置心跳定时器
重连流程:
- 检测到连接断开(
onclose/onerror)
- 等待 5 秒后尝试重连
- 重连成功后重置
reconnectTime 计数器
- 达到最大重连次数后停止尝试
后端配置与使用
引入依赖
在需要使用 WebSocket 的微服务模块的 pom.xml 中添加依赖:
<!--websocket 支持-->
<dependency>
<groupId>com.pig4cloud</groupId>
<artifactId>pigx-common-websocket</artifactId>
</dependency>
配置参数
在 application.yml 或 Nacos 配置中心添加以下配置(均为可选,有默认值):
pigx:
websocket:
path: /ws/info # WebSocket 端点路径 (默认: /ws/info)
allow-origins: "*" # 允许的跨域源 (默认: *)
heartbeat: true # 是否启用心跳 (默认: true)
map-session: true # 是否启用 Session 映射 (默认: true)
message-distributor: redis # 消息分发器类型: local | redis (默认: local)
send-time-limit: 10000 # 发送时间限制,单位毫秒 (默认: 10000)
send-buffer-size-limit: 64000 # 发送缓冲区大小限制 (默认: 64000)
配置项说明:
| 配置项 | 类型 | 默认值 | 说明 |
|---|
| path | String | /ws/info | WebSocket 连接路径 |
| allow-origins | String | * | CORS 允许的源 |
| heartbeat | Boolean | true | 是否启用心跳检测 |
| map-session | Boolean | true | 是否记录用户 Session 映射关系 |
| message-distributor | String | local | 消息分发模式:• local: 单机模式,直接发送• redis: 分布式模式,通过 Redis Pub/Sub 分发 |
| send-time-limit | Integer | 10000 | WebSocket 消息发送超时时间(毫秒) |
| send-buffer-size-limit | Integer | 64000 | WebSocket 发送缓冲区大小限制 |
服务端向客户端发送消息
发送消息给指定用户
import com.pig4cloud.pigx.common.core.util.SpringContextHolder;
import com.pig4cloud.pigx.common.websocket.distribute.MessageDO;
import com.pig4cloud.pigx.common.websocket.distribute.MessageDistributor;
import cn.hutool.core.collection.CollUtil;
// 获取消息分发器 (自动根据配置注入 LocalMessageDistributor 或 RedisMessageDistributor)
MessageDistributor messageDistributor = SpringContextHolder.getBean(MessageDistributor.class);
// 构建消息对象
MessageDO messageDO = new MessageDO();
messageDO.setNeedBroadcast(Boolean.FALSE); // 非广播模式
messageDO.setSessionKeys(CollUtil.newArrayList(1, 2, 3)); // 目标用户 ID 列表
messageDO.setMessageText("您有一条新的待办事项,请及时处理"); // 消息内容
// 发送消息
messageDistributor.distribute(messageDO);
关键说明:
sessionKeys: 接收消息的用户 ID 列表(对应数据库中的用户 ID)
needBroadcast: 设置为 false 表示定向发送
messageText: 发送的文本消息内容
广播消息给所有在线用户
// 方式1: 使用 MessageDO 手动构建
MessageDO messageDO = new MessageDO();
messageDO.setNeedBroadcast(Boolean.TRUE); // 广播模式
messageDO.setMessageText("系统将于 10 分钟后进行维护,请及时保存数据");
messageDistributor.distribute(messageDO);
// 方式2: 使用便捷方法
MessageDO broadcastMsg = MessageDO.broadcastMessage("系统维护通知: 服务将在 10 分钟后重启");
messageDistributor.distribute(broadcastMsg);
实际业务场景示例
场景1: 工作流审批通知
@Service
@RequiredArgsConstructor
public class FlowNotifyService {
private final MessageDistributor messageDistributor;
/**
* 发送审批通知
*/
public void sendApprovalNotice(Long userId, String taskName) {
MessageDO messageDO = new MessageDO();
messageDO.setNeedBroadcast(Boolean.FALSE);
messageDO.setSessionKeys(CollUtil.newArrayList(userId));
messageDO.setMessageText(String.format("您有新的审批任务【%s】待处理", taskName));
messageDistributor.distribute(messageDO);
}
}
场景2: 批量通知多个用户
/**
* 通知项目成员
*/
public void notifyProjectMembers(List<Long> memberIds, String message) {
MessageDO messageDO = new MessageDO();
messageDO.setNeedBroadcast(Boolean.FALSE);
messageDO.setSessionKeys(new ArrayList<>(memberIds)); // 批量用户ID
messageDO.setMessageText(message);
messageDistributor.distribute(messageDO);
}
场景3: 系统公告
/**
* 发布系统公告
*/
public void publishSystemAnnouncement(String announcement) {
// 广播给所有在线用户
MessageDO messageDO = MessageDO.broadcastMessage(announcement);
messageDistributor.distribute(messageDO);
}
服务端接收客户端发送的消息
默认行为
common-websocket 模块默认只会将客户端发送的消息输出到日志,不做任何业务处理:
// CustomPlanTextMessageHandler.java (默认实现)
@Override
public void handle(WebSocketSession session, String message) {
log.info("sessionId {} ,msg {}", session.getId(), message);
}
自定义消息处理
如果需要处理客户端发送的消息,可以实现 PlanTextMessageHandler 接口:
import com.pig4cloud.pigx.common.websocket.handler.PlanTextMessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
@Slf4j
@Component
public class BizPlanTextMessageHandler implements PlanTextMessageHandler {
@Override
public void handle(WebSocketSession session, String message) {
log.info("收到客户端消息 - SessionId: {}, Message: {}", session.getId(), message);
// 获取用户信息 (从 session attributes 中)
Object userAttr = session.getAttributes().get("USER_KEY_ATTR_NAME");
if (userAttr instanceof PigxUser) {
PigxUser user = (PigxUser) userAttr;
log.info("发送消息的用户ID: {}, 用户名: {}", user.getId(), user.getUsername());
}
// 示例: 解析消息并处理
try {
// 假设客户端发送 JSON 格式消息
JSONObject jsonMessage = JSONUtil.parseObj(message);
String type = jsonMessage.getStr("type");
String content = jsonMessage.getStr("content");
// 根据消息类型处理业务逻辑
switch (type) {
case "chat":
// 处理聊天消息
handleChatMessage(session, content);
break;
case "command":
// 处理指令消息
handleCommand(session, content);
break;
default:
log.warn("未知消息类型: {}", type);
}
} catch (Exception e) {
log.error("处理 WebSocket 消息异常", e);
}
}
private void handleChatMessage(WebSocketSession session, String content) {
// 实现聊天消息处理逻辑
log.info("处理聊天消息: {}", content);
}
private void handleCommand(WebSocketSession session, String content) {
// 实现指令处理逻辑
log.info("处理命令: {}", content);
}
}
Session Key 生成规则
PigX 默认使用用户 ID 作为 Session 唯一标识(PigxSessionKeyGenerator):
@Override
public Object sessionKey(WebSocketSession webSocketSession) {
Object obj = webSocketSession.getAttributes().get("USER_KEY_ATTR_NAME");
if (obj instanceof PigxUser) {
PigxUser user = (PigxUser) obj;
// 使用用户 ID 作为唯一标识
return String.valueOf(user.getId());
}
return null;
}
自定义 Session Key 规则
如果需要自定义(如使用 租户ID+用户ID 组合),可以实现 SessionKeyGenerator 接口:
@Component
public class CustomSessionKeyGenerator implements SessionKeyGenerator {
@Override
public Object sessionKey(WebSocketSession webSocketSession) {
Object obj = webSocketSession.getAttributes().get("USER_KEY_ATTR_NAME");
if (obj instanceof PigxUser) {
PigxUser user = (PigxUser) obj;
// 自定义规则: 租户ID + 用户ID
return user.getTenantId() + "_" + user.getId();
}
return null;
}
}
架构建议